/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


package org.apache.predictionio.tools.export

import org.apache.predictionio.controller.Utils
import org.apache.predictionio.data.storage.EventJson4sSupport
import org.apache.predictionio.data.storage.Storage
import org.apache.predictionio.tools.Runner
import org.apache.predictionio.workflow.WorkflowContext
import org.apache.predictionio.workflow.WorkflowUtils
import org.apache.predictionio.workflow.CleanupFunctions
import grizzled.slf4j.Logging
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.json4s.native.Serialization._

case class EventsToFileArgs(
  env: String = "",
  logFile: String = "",
  appId: Int = 0,
  channel: Option[String] = None,
  outputPath: String = "",
  format: String = "parquet",
  verbose: Boolean = false,
  debug: Boolean = false)

object EventsToFile extends Logging {
  def main(args: Array[String]): Unit = {
    val parser = new scopt.OptionParser[EventsToFileArgs]("EventsToFile") {
      opt[String]("env") action { (x, c) =>
        c.copy(env = x)
      }
      opt[String]("log-file") action { (x, c) =>
        c.copy(logFile = x)
      }
      opt[Int]("appid") action { (x, c) =>
        c.copy(appId = x)
      }
      opt[String]("channel") action { (x, c) =>
        c.copy(channel = Some(x))
      }
      opt[String]("format") action { (x, c) =>
        c.copy(format = x)
      }
      opt[String]("output") action { (x, c) =>
        c.copy(outputPath = x)
      }
      opt[Unit]("verbose") action { (x, c) =>
        c.copy(verbose = true)
      }
      opt[Unit]("debug") action { (x, c) =>
        c.copy(debug = true)
      }
    }
    parser.parse(args, EventsToFileArgs()) map { args =>
      try {
        // get channelId
        val channels = Storage.getMetaDataChannels
        val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap

        val channelId: Option[Int] = args.channel.map { ch =>
          if (!channelMap.contains(ch)) {
            error(s"Channel ${ch} doesn't exist in this app.")
            sys.exit(1)
          }

          channelMap(ch)
        }

        val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")

        WorkflowUtils.modifyLogging(verbose = args.verbose)
        @transient lazy implicit val formats = Utils.json4sDefaultFormats +
          new EventJson4sSupport.APISerializer
        val sc = WorkflowContext(
          mode = "Export",
          batch = "App ID " + args.appId + channelStr,
          executorEnv = Runner.envStringToMap(args.env))
        val sqlSession = SparkSession.builder().getOrCreate()
        val events = Storage.getPEvents()
        val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc)
        val jsonStringRdd = eventsRdd.map(write(_))
        if (args.format == "json") {
          jsonStringRdd.saveAsTextFile(args.outputPath)
        } else {
          val jsonDf = sqlSession.read.json(jsonStringRdd)
          jsonDf.write.mode(SaveMode.ErrorIfExists).parquet(args.outputPath)
        }
        info(s"Events are exported to ${args.outputPath}/.")
        info("Done.")

      } finally {
        CleanupFunctions.run()
      }
    }
  }
}
